Solutions/Infoblox/Data Connectors/InfobloxCloudDataConnector/SharedCode/utils.py (948 lines of code) (raw):
"""Utils File."""
import inspect
import requests
import time
import json
import datetime
import re
from .state_manager import StateManager
from azure.storage.fileshare import ShareDirectoryClient
from azure.core.exceptions import ResourceNotFoundError
from .infoblox_exception import InfobloxException
from .logger import applogger
from . import consts
from .sentinel import post_data
from random import randrange
class Utils:
"""Utils Class."""
def __init__(self, azure_function_name) -> None:
"""Init Function."""
self.azure_function_name = azure_function_name
self.log_format = consts.LOG_FORMAT
self.headers = {}
def check_environment_var_exist(self, environment_var):
"""Check the existence of required environment variables.
Logs the validation process and completion. Raises InfobloxException if any required field is missing.
Args:
environment_var(list) : variables to check for existence
"""
__method_name = inspect.currentframe().f_code.co_name
try:
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Validating Environment Variables",
)
)
missing_required_field = False
for i in environment_var:
key, val = next(iter(i.items()))
if (val is None) or (val == ""):
missing_required_field = True
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Environment variable {} is not set".format(key),
)
)
if missing_required_field:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Validation failed",
)
)
raise InfobloxException()
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Validation Complete",
)
)
except Exception as err:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
consts.UNEXPECTED_ERROR_MSG.format(err),
)
)
raise InfobloxException()
def get_checkpoint_data(self, checkpoint_obj: StateManager, load_flag=False):
"""Get checkpoint data from a StateManager object.
It retrieves the checkpoint data and logs it if the load flag is set to True.
Args:
checkpoint_obj (StateManager): The StateManager object to retrieve checkpoint data from.
load_flag (bool): A flag indicating whether to load the data as JSON (default is False).
Returns:
The retrieved checkpoint data.
"""
__method_name = inspect.currentframe().f_code.co_name
try:
checkpoint_data = checkpoint_obj.get()
if load_flag and checkpoint_data:
checkpoint_data = json.loads(checkpoint_data)
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Checkpoint fetch with json.loads",
)
)
return checkpoint_data
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Checkpoint fetch without json.loads",
)
)
return checkpoint_data
except json.decoder.JSONDecodeError as json_error:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"JSONDecodeError error : Error-{}".format(json_error),
)
)
raise InfobloxException()
except Exception as err:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
consts.UNEXPECTED_ERROR_MSG.format(err),
)
)
raise InfobloxException()
def post_checkpoint_data(self, checkpoint_obj: StateManager, data, dump_flag=False):
"""Post checkpoint data.
It posts the data to a checkpoint object based on the dump_flag parameter.
Args:
checkpoint_obj (StateManager): The StateManager object to post data to.
data: The data to be posted.
dump_flag (bool): A flag indicating whether to dump the data as JSON before posting (default is False).
"""
__method_name = inspect.currentframe().f_code.co_name
try:
if dump_flag:
applogger.debug(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Posting data = {}".format(data),
)
)
checkpoint_obj.post(json.dumps(data))
else:
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Posting data",
)
)
checkpoint_obj.post(data)
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Data posted to azure storage",
)
)
except TypeError as type_error:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Type error : Error-{}".format(type_error),
)
)
raise InfobloxException()
except Exception as err:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
consts.UNEXPECTED_ERROR_MSG.format(err),
)
)
raise InfobloxException()
def list_file_names_in_file_share(self, parent_dir: ShareDirectoryClient, file_name_prefix):
"""Get list of file names from directory.
Args:
parent_dir (ShareDirectory.from_connection_string): Object of ShareDirectory to perform operations
on file share.
Returns:
list: list of files
"""
__method_name = inspect.currentframe().f_code.co_name
try:
files_list = list(parent_dir.list_directories_and_files(file_name_prefix))
file_names = []
if (len(files_list)) > 0:
for file in files_list:
file_names.append(file["name"])
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Retrieved files for prefix = {}, Total files = {}".format(
file_name_prefix,
len(file_names),
),
)
)
return file_names
except ResourceNotFoundError:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"No storage directory found",
)
)
return None
except Exception as err:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
consts.UNEXPECTED_ERROR_MSG.format(err),
)
)
raise InfobloxException()
def delete_files_from_azure_storage(self, files_list, parent_dir: ShareDirectoryClient):
"""Delete list of files.
Args:
files_list (list) : list of files to be deleted
parent_dir (ShareDirectory.from_connection_string): Object of ShareDirectory to perform operations
on file share.
"""
__method_name = inspect.currentframe().f_code.co_name
try:
for file_path in files_list:
parent_dir.delete_file(file_path)
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Deleted files = {}".format(len(files_list)),
)
)
except Exception as err:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
consts.UNEXPECTED_ERROR_MSG.format(err),
)
)
raise InfobloxException()
def filter_file_list(self, file_prefix):
"""
Filter a list of filenames based on a given file prefix.
Args:
file_prefix(str): A string representing the prefix of the filenames to filter.
Returns:
list: A list of filenames that are created before 15 minutes ago.
"""
__method_name = inspect.currentframe().f_code.co_name
try:
parent_file = ShareDirectoryClient.from_connection_string(
conn_str=consts.CONN_STRING,
share_name=consts.FILE_SHARE_NAME_DATA,
directory_path="",
)
filenames = self.list_file_names_in_file_share(parent_file, file_prefix)
if filenames is None:
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"No files found",
)
)
return []
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Sort Files by current timestamp",
)
)
sorted_filenames = sorted(filenames, key=self.get_timestamp)
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Filter files created before 15 mins.",
)
)
current_time = int(time.time())
filtered_filenames = [
filename
for filename in sorted_filenames
if ((current_time - self.get_timestamp(filename)) > consts.MAX_FILE_AGE_FOR_INDICATORS)
]
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Filtered File count = {}".format(len(filtered_filenames)),
)
)
return filtered_filenames
except InfobloxException:
raise InfobloxException()
except Exception as error:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
consts.UNEXPECTED_ERROR_MSG.format(error),
)
)
raise InfobloxException()
def get_timestamp(self, filename):
"""Get the timestamp from filename.
Args:
filename (str): The name of the file.
Returns:
int: The timestamp of the file.
"""
__method_name = inspect.currentframe().f_code.co_name
try:
return int(filename.split("_")[5])
except IndexError as index_error:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Index error : Error-{}".format(index_error),
)
)
raise InfobloxException()
except Exception as error:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
consts.UNEXPECTED_ERROR_MSG.format(error),
)
)
raise InfobloxException()
def handle_failed_indicators(self, indicator_list, response_json):
"""Handle failed indicators by writing them to a new file or ingesting them into a log table.
Args:
indicator_list (list): List of indicators.
response_json (dict): JSON response including errors.
"""
__method_name = inspect.currentframe().f_code.co_name
try:
record_indexes = [error.get("recordIndex") for error in response_json["errors"]]
failed_indicators = [indicator_list[index] for index in record_indexes]
# LOGIC TO WRITE FAILED INDICATORS IN NEW FILE
if self.azure_function_name == consts.INDICATOR_FUNCTION_NAME:
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Writing Failed Indicators to new file.",
)
)
file_timestamp = time.time()
failed_file_name = "infoblox_failed_tide_indicators_file_{}".format(str(int(file_timestamp)))
failed_indicator_file_obj = StateManager(
consts.CONN_STRING, failed_file_name, consts.FILE_SHARE_NAME_DATA
)
self.post_checkpoint_data(failed_indicator_file_obj, failed_indicators, dump_flag=True)
elif self.azure_function_name == consts.FAILED_INDICATOR_FUNCTION_NAME:
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Ingesting Failed Indicators to Log Table.",
)
)
post_data(
body=json.dumps(failed_indicators, ensure_ascii=False),
log_type=consts.FAILED_INDICATORS_TABLE_NAME,
)
except KeyError as keyerror:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Key error : Error-{}".format(keyerror),
)
)
raise InfobloxException()
except InfobloxException:
raise InfobloxException()
except Exception as error:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
consts.UNEXPECTED_ERROR_MSG.format(error),
)
)
raise InfobloxException()
def auth_sentinel(self):
"""Authenticate with microsoft sentinel and update header."""
__method_name = inspect.currentframe().f_code.co_name
try:
for i in range(consts.MAX_RETRIES):
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Generating microsoft sentinel access token.",
)
)
azure_auth_url = consts.AZURE_AUTHENTICATION_URL.format(consts.AZURE_TENANT_ID)
applogger.debug(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Calling auth url = {}".format(azure_auth_url),
)
)
body = {
"client_id": consts.AZURE_CLIENT_ID,
"client_secret": consts.AZURE_CLIENT_SECRET,
"grant_type": "client_credentials",
"scope": "https://management.azure.com/.default",
}
try:
response = requests.post(url=azure_auth_url, data=body)
except requests.RequestException as error:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Request error : Error-{} Index = {}".format(error, i),
)
)
continue
if response.status_code >= 200 and response.status_code <= 299:
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Got response with Status code : {}".format(response.status_code),
)
)
response_json = response.json()
bearer_token = self.get_bearer_token_from_response(response_json)
applogger.debug(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Bearer Token Generated: {}".format(bearer_token),
)
)
self.headers = {
"Content-Type": "application/json",
"Authorization": "Bearer {}".format(bearer_token),
}
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"MS authentication complete",
)
)
return
elif response.status_code == 400:
response_json = response.json()
error = response_json.get("error", "Bad request")
error_description = response_json.get("error_description", "")
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Status Code = {}, Error-{}, Error Description = {}".format(
response.status_code,
error,
error_description,
),
)
)
raise InfobloxException()
elif response.status_code == 401:
response_json = response.json()
error = response_json.get("error", "Unauthorized")
error_description = response_json.get("error_description", "")
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Status Code = {}, Error-{}, Error Description = {}".format(
response.status_code,
error,
error_description,
),
)
)
raise InfobloxException()
elif response.status_code == 500:
log_message = "Internal Server Error"
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Status Code = {}, Error-{}".format(response.status_code, log_message),
)
)
raise InfobloxException()
else:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Status Code = {}, Error-{}".format(response.status_code, response.content),
)
)
raise InfobloxException()
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Max retries reached for authentication of sentinel API",
)
)
raise InfobloxException()
except InfobloxException:
raise InfobloxException()
except requests.HTTPError as error:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
consts.HTTP_ERROR_MSG.format(error),
)
)
raise InfobloxException()
except Exception as error:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
consts.UNEXPECTED_ERROR_MSG.format(error),
)
)
raise InfobloxException()
def get_bearer_token_from_response(
self,
json_response,
):
"""Retrieve the bearer token from the JSON response.
Args:
self: The object instance.
json_response: The JSON response containing the access token.
Returns:
string: The bearer token extracted from the JSON response.
"""
__method_name = inspect.currentframe().f_code.co_name
try:
if "access_token" not in json_response:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Access token not found in sentinel api call",
)
)
raise InfobloxException()
else:
bearer_token = json_response.get("access_token")
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Microsoft sentinel access token generated successfully.",
)
)
return bearer_token
except KeyError as keyerror:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Key error : Error-{}".format(keyerror),
)
)
raise InfobloxException()
except Exception as error:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
consts.UNEXPECTED_ERROR_MSG.format(error),
)
)
raise InfobloxException()
def send_indicators_to_threat_intelligence(self, indicator_list):
"""Create indicators in sentinel workspace thereat intelligence section.
Return response in json formate if status code is in between [200, 299]
Args:
url (String): URL of the rest call.
method (String): HTTP method of rest call. Eg. "GET", etc.
headers (Dict, optional): headers. Defaults to None.
params (Dict, optional): parameters. Defaults to None.
payload (Type : As required by the rest call, optional): body. Defaults to None.
Returns:
response : response of the rest call.
"""
__method_name = inspect.currentframe().f_code.co_name
try:
for i in range(consts.MAX_RETRIES):
upload_indicator_url = consts.UPLOAD_SENTINEL_INDICATORS_URL.format(consts.WORKSPACE_ID)
applogger.debug(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Calling url: {}".format(upload_indicator_url),
)
)
body = {
"sourcesystem": "Infoblox-TIDE-Threats-Custom",
"value": indicator_list,
}
try:
response = requests.post(
url=upload_indicator_url,
headers=self.headers,
data=json.dumps(body, ensure_ascii=False),
)
except requests.ConnectionError as error:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"ConnectionError error : Retry = {} : Error-{}".format(i, error),
)
)
time.sleep(randrange(1, 10))
continue
if response.status_code >= 200 and response.status_code <= 299:
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Rest Call Completed, Status code : {}".format(response.status_code),
)
)
response_json = response.json()
return response_json
elif response.status_code == 401:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Unauthorized, Status code : {}, Generating new access token, Retry = {}".format(
response.status_code, i
),
)
)
self.auth_sentinel()
continue
elif response.status_code == 429:
message = response.json().get("message", "")
match = re.search(r"Try again in (\d+) seconds", message)
seconds = (int(match.group(1)) + 2) if match else consts.SLEEP_TIME
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Too many requests, Status code : {}, Retry {}, After {} seconds".format(
response.status_code, i, seconds
),
)
)
time.sleep(seconds)
continue
else:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Error while creating indicators, Status code: {}, Error-{}".format(
response.status_code, response.content
),
)
)
raise InfobloxException()
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Max retries exceeded.",
)
)
raise InfobloxException()
except requests.HTTPError as error:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
consts.HTTP_ERROR_MSG.format(error),
)
)
raise InfobloxException()
except requests.RequestException as error:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Request error : Error-{}".format(error),
)
)
raise InfobloxException()
except Exception as error:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
consts.UNEXPECTED_ERROR_MSG.format(error),
)
)
raise InfobloxException()
def upload_indicator(
self,
indicator_list,
):
"""
Upload indicators to microsoft sentinel.
Args:
azure_function_name (str): Name of the azure function
indicator_list (list): List of indicators to be uploaded
Raises:
InfobloxException: If an error occurs while uploading indicators.
"""
__method_name = inspect.currentframe().f_code.co_name
try:
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Uploading Indicators, Length of records : {}".format(len(indicator_list)),
)
)
response_json = self.send_indicators_to_threat_intelligence(indicator_list)
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Checking for error in response",
)
)
if len(response_json.get("errors")) != 0:
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Some indicators are failed to create, No. of failed indicators = {}".format(
len(response_json.get("errors"))
),
)
)
self.handle_failed_indicators(indicator_list, response_json)
else:
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"No error in Response",
)
)
except InfobloxException:
raise InfobloxException()
except Exception as error:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
consts.UNEXPECTED_ERROR_MSG.format(error),
)
)
raise InfobloxException()
def authenticate_infoblox_api(self):
"""Authenticate the Infoblox API."""
__method_name = inspect.currentframe().f_code.co_name
self.headers.update({"Authorization": "Token {}".format(consts.API_TOKEN)})
applogger.debug(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Headers = {}".format(self.headers),
)
)
def add_xh_to_iso_time_string(self, date_time, x):
"""Add x hours to a given ISO formatted date and time string.
Args:
date_time (str): The input date and time string in the format "%Y-%m-%d %H:%M:%S.%f"
x (int): The number of hours to add to the input date and time.
Returns:
str: The new date and time string after adding x hours in the format "%Y-%m-%d %H:%M:%S.%f".
"""
__method_name = inspect.currentframe().f_code.co_name
try:
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Before = {}".format(date_time),
)
)
date_time_obj = datetime.datetime.strptime(date_time, consts.DATE_TIME_FORMAT)
date_time_obj = date_time_obj + datetime.timedelta(hours=x)
new_date_time = date_time_obj.strftime(consts.DATE_TIME_FORMAT)[:-3]
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"After = {}".format(new_date_time),
)
)
return new_date_time
except Exception as err:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
consts.UNEXPECTED_ERROR_MSG.format(err),
)
)
raise InfobloxException()
def iso_to_epoch_str(self, date_time):
"""Convert an ISO formatted date and time string to epoch time.
Args:
date_time (str): The input date and time string in the format "%Y-%m-%d %H:%M:%S.%f"
Returns:
str: The epoch time as a string.
"""
__method_name = inspect.currentframe().f_code.co_name
try:
date_time_obj = datetime.datetime.strptime(date_time, consts.DATE_TIME_FORMAT)
epoch_time = int(date_time_obj.timestamp())
return str(epoch_time)
except (TypeError, ValueError) as error:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Type/Value error : Error-{}".format(error),
)
)
raise InfobloxException()
except Exception as err:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
consts.UNEXPECTED_ERROR_MSG.format(err),
)
)
raise InfobloxException()
def make_dossier_call(self, url, method, headers, params=None, body=None):
"""Make a dossier call.
Args:
url (str): The URL to make the call to.
method (str): The HTTP method to use for the call.
headers (dict): The headers to include in the request.
params (dict, optional): The parameters to pass in the call (default is None).
body (dict, optional): The body of the request (default is None).
Returns:
dict: The JSON response if the call is successful.
"""
__method_name = inspect.currentframe().f_code.co_name
try:
for i in range(consts.MAX_RETRIES):
applogger.debug(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Calling url: {}".format(url),
)
)
response = requests.request(
method,
url,
headers=headers,
params=params,
json=body,
)
if response.status_code >= 200 and response.status_code <= 299:
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Rest Call Completed, Status code : {}".format(response.status_code),
)
)
response_json = response.json()
return response_json
elif response.status_code == 500:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Internal Server Error, Status code : {}".format(response.status_code),
)
)
raise InfobloxException()
elif response.status_code == 429:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Too many requests, Status code : {}, Retrying... {}".format(response.status_code, i),
)
)
time.sleep(randrange(1, 10))
continue
elif response.status_code == 404:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Not found, Status code : {}".format(response.status_code),
)
)
raise InfobloxException()
else:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Error while creating job, Status code: {}, Error-{}".format(
response.status_code, response.content
),
)
)
raise InfobloxException()
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Max retries exceeded.",
)
)
raise InfobloxException()
except requests.ConnectionError as error:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Connection error : Error-{}".format(error),
)
)
raise InfobloxException()
except requests.HTTPError as error:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
consts.HTTP_ERROR_MSG.format(error),
)
)
raise InfobloxException()
except requests.RequestException as error:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Request error : Error-{}".format(error),
)
)
raise InfobloxException()
except Exception as error:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
consts.UNEXPECTED_ERROR_MSG.format(error),
)
)
raise InfobloxException()